Scalaで並行処理#4 – AkkaのFault Tolerance(耐障害性)
Supervisorによる耐障害性
Akkaでのエラーや障害に対する対処は「let it crash」というアプローチをとっています。 この考え方は障害は発生しうるものという前提に基づき、障害が発生したらプロセスを適切に再スタートさせるという考え方です。 そのためにAkkaではSupervisorとよばれるものがActorを管理し、監視対象のActorに障害が発生した際、 プロセスを再スタートさせ、プロセスを生かし続けます。こうしてAkkaは耐障害性を確保しています。
Supervisorが行う再スタート方法
AkkaのSupervisorは二種類の再スタート方法を持っています。 1.One-For-One 管理対象のうちいずれかのプロセスがクラッシュしたら、その対象のコンポーネントのみを再スタートさせます。 2.All-For-One Supervisorも含む、管理対象のうちいずれかのプロセスがクラッシュしたら、管理しているすべてのコンポーネントを再スタートさせます。
とりあえずサンプルを作成してためしてみましょう。
今回使用した動作環境は以下のとおりです。
- OS : MacOS X 10.7.2
- Java : 1.6.0_26
- Scala : 2.9.1 final
- SBT : 0.11.2
実行環境のセットアップ
#2の記事で作成したAkkaサンプルを使用します。その記事を参考にプロジェクトを作成しておきましょう。
サンプル実行
AllForOne(1つがエラーになったら全部再スタートする)を試してみましょう。 MyAkka.scalaの中身を下記のようにしてください。
import akka.actor.Actor import akka.actor.Actor._ import akka.actor.Supervisor import akka.config.Supervision._ /** * FirstErrorActor用のエラーフラグ */ object Flag { var isError = true } /** * 1回目はエラーになるアクター */ class FirstErrorActor extends Actor { self.start() def receive = { case msg => { println(msg + " by FirstErrorActor") if(Flag.isError) throw new Exception("FirstErrorActor Error") println("FirstErrorActor receive done.") } } override def preRestart(reason: Throwable) = { println("FirstErrorActor pre Restart.") Flag.isError = false } override def postRestart(reason: Throwable) = { println("FirstErrorActor post Restart.") } } /** * 通常のアクター */ class NomalActor extends Actor { self.start() def receive = { case msg => { println(msg + " by NomalActor") } } override def preRestart(reason: Throwable) = { println("NomalActor pre Restart.") } override def postRestart(reason: Throwable) = { println("NomalActor post Restart.") } } object Main extends App { val myAkkaActor1 = actorOf(new FirstErrorActor()) val myAkkaActor2 = actorOf(new NomalActor()) val supervisor = Supervisor( SupervisorConfig( AllForOneStrategy(List(classOf[Exception]), 3, 1000), Supervise(myAkkaActor1 , Permanent) :: Supervise(myAkkaActor2 , Permanent) :: Nil ) ).start println("-----NomalActor receive-----") myAkkaActor2 ! "success!" Thread.sleep(5000) println("-----FirstErrorActor receive(Error)-----") myAkkaActor1 ! "error!" Thread.sleep(5000) println("-----FirstErrorActor receive(Success)-----") myAkkaActor1 ! "success!" }
まずはアクターの説明です。必要なパッケージをimportしたら、2つのアクターを定義します。 FirstErrorActorは1度目の実行では例外を投げるアクターです。 NomalActorはそのまま問題なく実行可能なアクターです。
/** * 1回目はエラーになるアクター */ class FirstErrorActor extends Actor { self.start() def receive = { case msg => { println(msg + " by FirstErrorActor") if(Flag.isError) throw new Exception("FirstErrorActor Error") println("FirstErrorActor receive done.") } } override def preRestart(reason: Throwable) = { println("FirstErrorActor pre Restart.") Flag.isError = false } override def postRestart(reason: Throwable) = { println("FirstErrorActor post Restart.") } } /** * 通常のアクター */ class NomalActor extends Actor { self.start() def receive = { case msg => { println(msg + " by NomalActor") } } override def preRestart(reason: Throwable) = { println("NomalActor pre Restart.") } override def postRestart(reason: Throwable) = { println("NomalActor post Restart.") } }
それぞれのアクターがオーバーライドしているpreRestartとpostRestartは、Supervisorによって再スタートする前/後に実行されます。
Mainクラスでは2つのアクターを作成したあと、Supervisorに登録しています。
val supervisor = Supervisor( SupervisorConfig( AllForOneStrategy(List(classOf[Exception]),//監視対象の例外 3, //再スタートリトライ回数 1000),//指定時間 Supervise(myAkkaActor1 , Permanent) :: Supervise(myAkkaActor2 , Permanent) :: Nil) ).start
SupervisorConfigでは第1引数にAllForOneStrategy(1つがエラーになったら全部再起動)を指定します。 AllForOneStrategyの第4引数以降はSuperviseを指定します。 Superviseの第1引数にアクターのインスタンスを指定し、第2引数にPermanent(常に再起動)を指定します。 これをListでつなげてSupervisorに渡せば設定が完了します。
設定が完了したら、NomalActor,FirstErrorActor,FirstErrorActorの順番にメッセージを送ります。
println("-----NomalActor receive-----") myAkkaActor2 ! "success!" Thread.sleep(5000) println("-----FirstErrorActor receive(Error)-----") myAkkaActor1 ! "error!" Thread.sleep(5000) println("-----FirstErrorActor receive(Success)-----") myAkkaActor1 ! "success!"
実行結果下記のようになるはずです。
> run [info] Running Main -----NomalActor receive----- success! by NomalActor -----FirstErrorActor receive(Error)----- error! by FirstErrorActor NomalActor pre Restart. NomalActor post Restart. [ERROR] [12/01/31 17:59] [akka:event-driven:dispatcher:global-2] [LocalActorRef] FirstErrorActor Error java.lang.Exception: FirstErrorActor Error at FirstErrorActor$$anonfun$receive$1.apply(MyAkka.scala:24) at FirstErrorActor$$anonfun$receive$1.apply(MyAkka.scala:21) at akka.actor.Actor$class.apply(Actor.scala:545) at FirstErrorActor.apply(MyAkka.scala:16) ・・・・・・ ・・・・・・ FirstErrorActor pre Restart. FirstErrorActor post Restart. -----FirstErrorActor receive(Success)----- success! by FirstErrorActor FirstErrorActor receive done.
FirstErrorActorの1回目の実行でエラーが起こった後、NomalActorの再スタートが行われているのがわかります。 その後FirstErrorActorも再スタート処理が行われ、2回目は問題なく処理が終了しています。
まとめ
今回はAkkaのFault Tolerance機能に少しだけふれて見ました。 簡単なサンプルでしたが、この機能を使えばエラー発生時のリカバリ処理が比較的容易に記述できますね。
参考サイトなど
- Scalaで並行処理#1 - Actorを使う:https://dev.classmethod.jp/server-side/scala-actor/
- Scalaで並行処理#2 - AkkaのActorを使う: https://dev.classmethod.jp/etc/scala-akka/
- Scalaで並行処理#3 - Akkaのconfigファイル:https://dev.classmethod.jp/server-side/actor3-config/